/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.inlong.sort.parser;

import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.TimestampFormatInfo;
import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Test for SqlServer{@link SqlServerLoadNode} and {@link SqlServerExtractNode} SQL parser.
 */
public class SqlServerNodeSqlParseTest extends AbstractTestBase {

    /**
     * Build mysql extract node.
     */
    private MySqlExtractNode buildMySQLExtractNode(String id) {
        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                new FieldInfo("name", new StringFormatInfo()));
        // if you hope hive load mode of append,please add this config.
        Map<String, String> map = new HashMap<>();
        map.put("append-mode", "true");
        return new MySqlExtractNode(id, "mysql", fields,
                null, map, "id",
                Collections.singletonList("work1"), "localhost", "root", "password",
                "inlong", null, null,
                null, null);
    }

    /**
     * Build sqlserver extract node.
     */
    private SqlServerExtractNode buildSqlServerExtractNode(String id) {
        List<FieldInfo> fields = Arrays.asList(
                new FieldInfo("id", new LongFormatInfo()),
                new FieldInfo("val_char", new StringFormatInfo()),
                new FieldInfo("proctime", new TimestampFormatInfo()),
                new FieldInfo("database_name", new StringFormatInfo()),
                new FieldInfo("table_name", new StringFormatInfo()),
                new FieldInfo("op_ts", new TimestampFormatInfo()),
                new FieldInfo("schema_name", new StringFormatInfo()));
        return new SqlServerExtractNode(id, "sqlserver_out", fields, null, null,
                null, "localhost", 1433, "SA", "INLONG*123",
                "column_type_test", "dbo", "full_types", null);
    }

    /**
     * Build kafka load node.
     */
    private KafkaLoadNode buildKafkaNode(String id) {
        List<FieldInfo> fields = Arrays.asList(
                new FieldInfo("id", new LongFormatInfo()),
                new FieldInfo("val_char", new StringFormatInfo()),
                new FieldInfo("proctime", new TimestampFormatInfo()),
                new FieldInfo("database_name", new StringFormatInfo()),
                new FieldInfo("table_name", new StringFormatInfo()),
                new FieldInfo("op_ts", new TimestampFormatInfo()),
                new FieldInfo("schema_name", new StringFormatInfo()));
        List<FieldRelation> relations = Arrays.asList(
                new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                        new FieldInfo("id", new LongFormatInfo())),
                new FieldRelation(new FieldInfo("val_char", new StringFormatInfo()),
                        new FieldInfo("val_char", new StringFormatInfo())),
                new FieldRelation(new FieldInfo("proctime", new TimestampFormatInfo()),
                        new FieldInfo("proctime", new TimestampFormatInfo())),
                new FieldRelation(new FieldInfo("database_name", new StringFormatInfo()),
                        new FieldInfo("database_name", new StringFormatInfo())),
                new FieldRelation(new FieldInfo("table_name", new StringFormatInfo()),
                        new FieldInfo("table_name", new StringFormatInfo())),
                new FieldRelation(new FieldInfo("op_ts", new TimestampFormatInfo()),
                        new FieldInfo("op_ts", new TimestampFormatInfo())),
                new FieldRelation(new FieldInfo("schema_name", new StringFormatInfo()),
                        new FieldInfo("schema_name", new StringFormatInfo()))

        );
        return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
                "sqlserver", "localhost:9092",
                new JsonFormat(), null,
                null, "id");
    }

    /**
     * Build sqlserver load node.
     */
    private SqlServerLoadNode buildSqlServerLoadNode(String id) {
        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                new FieldInfo("name", new StringFormatInfo()));
        List<FieldRelation> relations = Arrays
                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                        new FieldInfo("id", new LongFormatInfo())),
                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                new FieldInfo("name", new StringFormatInfo())));
        Map<String, String> properties = new LinkedHashMap<>();
        properties.put("dirty.side-output.connector", "log");
        properties.put("dirty.ignore", "true");
        properties.put("dirty.side-output.enable", "true");
        properties.put("dirty.side-output.format", "csv");
        properties.put("dirty.side-output.labels",
                "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
        return new SqlServerLoadNode(id, "sqlserver_out", fields, relations, null, null, 1,
                properties, "jdbc:sqlserver://localhost:1433;databaseName=column_type_test", "SA",
                "INLONG*123", "dbo", "work1", "id");
    }

    /**
     * Build node relation.
     */
    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
        return new NodeRelation(inputIds, outputIds);
    }

    /**
     * Test extract data from mysql and load data to sqlserver.
     */
    @Test
    public void testSqlServerLoad() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(10000);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        Node mySqlExtractNode = buildMySQLExtractNode("1");
        Node sqlServerLoadNode = buildSqlServerLoadNode("2");
        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(mySqlExtractNode, sqlServerLoadNode),
                Collections.singletonList(buildNodeRelation(Collections.singletonList(mySqlExtractNode),
                        Collections.singletonList(sqlServerLoadNode))));
        GroupInfo groupInf = new GroupInfo("1", Collections.singletonList(streamInfo));
        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInf);
        Assert.assertTrue(parser.parse().tryExecute());
    }

    /**
     * Test extract data from sqlserver and load data to kafka.
     */
    @Test
    public void testSqlServerExtract() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(10000);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        Node sqlServerExtractNode = buildSqlServerExtractNode("1");
        Node kafkaLoadNode = buildKafkaNode("2");
        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(sqlServerExtractNode, kafkaLoadNode),
                Collections.singletonList(buildNodeRelation(Collections.singletonList(sqlServerExtractNode),
                        Collections.singletonList(kafkaLoadNode))));
        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
        Assert.assertTrue(parser.parse().tryExecute());
    }

}
